[SPARK-39347][SS] Bug fix for time window calculation when event time < 0#39843
[SPARK-39347][SS] Bug fix for time window calculation when event time < 0#39843WweiL wants to merge 9 commits intoapache:masterfrom
Conversation
|
cc @HeartSaVioR |
|
@WweiL |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Only nits. Thanks for working on this!
| ) | ||
| } | ||
|
|
||
| val df5 = Seq( |
There was a problem hiding this comment.
nit: is this test case testing other edge case than above one? Otherwise let's remove this.
There was a problem hiding this comment.
Right this is actually testing some other cases, below I put year 1968 not 1969. The 1969 test case tests when window start is less than 0 but window end is greater than 0. I added this case to test when both start and end are less than 0.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala
Outdated
Show resolved
Hide resolved
|
Looks good. I think we back-port this to 3.2? I can try that. |
Thanks! That’d be very helpful! I can’t access my laptop right now. |
|
SPARK-38069 was merged in Spark 3.3. Porting back to master/3.4/3.3 would be enough. |
HeartSaVioR
left a comment
There was a problem hiding this comment.
+1
I don't feel like co-auther credit is mandatory as the origin fix was also something he borrowed from Flink. Mentioning the credit in PR description seems sufficient.
|
Thanks! Merging to master/3.4/3.3. |
… < 0 ### What changes were proposed in this pull request? I tried to understand what was introduced in #36737 and made the code more readable and added some test. Many thanks to nyingping! The change in #35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number. For example, ``` scala> 1 % 3 res0: Int = 1 scala> -1 % 3 res1: Int = -1 // Mathematically it should be 2 here ``` This lead to a wrong calculation result of `windowStart`. For a concrete example: ``` * Example calculation: * For simplicity assume windowDuration = slideDuration. * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | * | |----l1 ----|---- l2 -----| * lastStart timestamp lastStartWrong * Normally when timestamp > startTime (or equally remainder > 0), we get * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. * So we need to subtract a slideDuration. ``` ### Why are the changes needed? This is a bug fix. Example from the original PR #36737: Here df3 and df4 has time before 1970, so timestamp < 0. ``` val df3 = Seq( ("1969-12-31 00:00:02", 1), ("1969-12-31 00:00:12", 2)).toDF("time", "value") val df4 = Seq( (LocalDateTime.parse("1969-12-31T00:00:02"), 1), (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") Seq(df3, df4).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) ) } ``` Without the change this would error with: ``` == Results == !== Correct Answer - 2 == == Spark Answer - 2 == !struct<> struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31 00:00:15,1] ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31 00:00:25,2] ``` Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. ### Benchmark results: 1. Burak's original Implementation ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 10 17 14 962.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: burak version [info] Stopped after 16 iterations, 10604 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 646 663 19 15.5 64.6 1.0X ``` 2. Current implementation (buggy) ``` [info] Running benchmark: tumbling windows [info] Running case: current - buggy [info] Stopped after 637 iterations, 10008 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 10 16 12 1042.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: current - buggy [info] Stopped after 16 iterations, 10143 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 617 634 10 16.2 61.7 1.0X ``` 3. Purposed change in this PR: ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 10 16 11 981.2 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: purposed change [info] Stopped after 18 iterations, 10122 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 548 562 19 18.3 54.8 1.0X ``` Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations. Closes #39843 from WweiL/SPARK-38069-time-window-fix. Lead-authored-by: Wei Liu <wei.liu@databricks.com> Co-authored-by: nieyingping <nieyingping@alphadata.com.cn> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit 87d4eb6) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
|
There is merge conflict in branch-3.3. @WweiL Would you mind creating a backport PR for this? Thanks! |
… < 0 ### What changes were proposed in this pull request? I tried to understand what was introduced in apache#36737 and made the code more readable and added some test. Many thanks to nyingping! The change in apache#35362 brought a bug when the `timestamp` is less than 0, i.e. before `1970-01-01 00:00:00 UTC`. Then for some windows, spark returns a wrong `windowStart` time. The root cause of this bug is how the module operator(%) works with negative number. For example, ``` scala> 1 % 3 res0: Int = 1 scala> -1 % 3 res1: Int = -1 // Mathematically it should be 2 here ``` This lead to a wrong calculation result of `windowStart`. For a concrete example: ``` * Example calculation: * For simplicity assume windowDuration = slideDuration. * | x x x x x x x x x x x x | x x x x x x x x x x x x | x x x x x x x x x x x x | * | |----l1 ----|---- l2 -----| * lastStart timestamp lastStartWrong * Normally when timestamp > startTime (or equally remainder > 0), we get * l1 = remainder = (timestamp - startTime) % slideDuration, lastStart = timeStamp - remainder * However, when timestamp < startTime (or equally remainder < 0), the value of remainder is * -l2 (note the negative sign), and lastStart is then at the position of lastStartWrong. * So we need to subtract a slideDuration. ``` ### Why are the changes needed? This is a bug fix. Example from the original PR apache#36737: Here df3 and df4 has time before 1970, so timestamp < 0. ``` val df3 = Seq( ("1969-12-31 00:00:02", 1), ("1969-12-31 00:00:12", 2)).toDF("time", "value") val df4 = Seq( (LocalDateTime.parse("1969-12-31T00:00:02"), 1), (LocalDateTime.parse("1969-12-31T00:00:12"), 2)).toDF("time", "value") Seq(df3, df4).foreach { df => checkAnswer( df.select(window($"time", "10 seconds", "10 seconds", "5 seconds"), $"value") .orderBy($"window.start".asc) .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), Seq( Row("1969-12-30 23:59:55", "1969-12-31 00:00:05", 1), Row("1969-12-31 00:00:05", "1969-12-31 00:00:15", 2)) ) } ``` Without the change this would error with: ``` == Results == !== Correct Answer - 2 == == Spark Answer - 2 == !struct<> struct<CAST(window.start AS STRING):string,CAST(window.end AS STRING):string,value:int> ![1969-12-30 23:59:55,1969-12-31 00:00:05,1] [1969-12-31 00:00:05,1969-12-31 00:00:15,1] ![1969-12-31 00:00:05,1969-12-31 00:00:15,2] [1969-12-31 00:00:15,1969-12-31 00:00:25,2] ``` Notice how this is shifted with one `slideDuration`. It should start with `[1969-12-30 23:59:55,1969-12-31 00:00:05,1]` but spark returns `[1969-12-31 00:00:05,1969-12-31 00:00:15,1]`, right-shifted of one `slideDuration` (10 seconds). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. ### Benchmark results: 1. Burak's original Implementation ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 10 17 14 962.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: burak version [info] Stopped after 16 iterations, 10604 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] burak version 646 663 19 15.5 64.6 1.0X ``` 2. Current implementation (buggy) ``` [info] Running benchmark: tumbling windows [info] Running case: current - buggy [info] Stopped after 637 iterations, 10008 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 10 16 12 1042.7 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: current - buggy [info] Stopped after 16 iterations, 10143 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] current - buggy 617 634 10 16.2 61.7 1.0X ``` 3. Purposed change in this PR: ``` [info] Apple M1 Max [info] tumbling windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 10 16 11 981.2 1.0 1.0X [info] Running benchmark: sliding windows [info] Running case: purposed change [info] Stopped after 18 iterations, 10122 ms [info] OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Mac OS X 12.5.1 [info] Apple M1 Max [info] sliding windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative [info] ------------------------------------------------------------------------------------------------------------------------ [info] purposed change 548 562 19 18.3 54.8 1.0X ``` Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations. Closes apache#39843 from WweiL/SPARK-38069-time-window-fix. Lead-authored-by: Wei Liu <wei.liu@databricks.com> Co-authored-by: nieyingping <nieyingping@alphadata.com.cn> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit 87d4eb6) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
I accidentally browsed the original author's blog and saw this PR. I think your sentence is incorrect. Even if it is a reference, it is a kind of discovery. Co-auther is a kind of affirmation. The submitter is maintaining a harmonious environment. You there is no need to deny it. Of course I'm just expressing thoughts, have a good life |
What changes were proposed in this pull request?
I tried to understand what was introduced in #36737 and made the code more readable and added some test. Many thanks to @nyingping!
The change in #35362 brought a bug when the
timestampis less than 0, i.e. before1970-01-01 00:00:00 UTC. Then for some windows, spark returns a wrongwindowStarttime. The root cause of this bug is how the module operator(%) works with negative number.For example,
This lead to a wrong calculation result of
windowStart. For a concrete example:Why are the changes needed?
This is a bug fix.
Example from the original PR #36737:
Here df3 and df4 has time before 1970, so timestamp < 0.
Without the change this would error with:
Notice how this is shifted with one
slideDuration. It should start with[1969-12-30 23:59:55,1969-12-31 00:00:05,1]but spark returns[1969-12-31 00:00:05,1969-12-31 00:00:15,1], right-shifted of oneslideDuration(10 seconds).Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test.
Benchmark results:
Note that I run them separately, because I found that if you run these tests sequentially, the later one will always get a performance gain. I think the computer is doing some optimizations.